Source code for hysop.tools.io_utils

# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""Tools related to i/o in HySoP.

.. currentmodule hysop.tools.io_utils

* :class:`~IO`
* :class:`~IOParams`
* :class:`~Writer`
* :class:`~XMF`, tools to prepare/write xmf files.

"""
import os
import sys
import psutil
import warnings
import tempfile
import socket
import shutil
import atexit
import subprocess
import numpy as np
from collections import namedtuple
from inspect import getouterframes, currentframe
from re import findall

from hysop.constants import DirectionLabels
from hysop.tools.htypes import first_not_None, check_instance
from hysop.tools.parameters import MPIParams
from hysop.tools.warning import HysopWarning
from hysop.tools.decorators import requires_cmd
import hysop.core.mpi as mpi


[docs] class IO: """ Static class with utilities to set/find the place where i/o files will be read/written. """ _default_path = None _cache_path = None _tmp_dirs = {} HDF5 = 998 """HDF5 format id""" ASCII = 997 """ascii format id"""
[docs] @staticmethod @requires_cmd("stat") def get_fs_type(path): cmd = ["stat", "-f", "-c", "%T", path] fs_type = "" if mpi.main_rank == 0: fs_type = subprocess.check_output(cmd).decode("utf-8") fs_type = mpi.main_comm.bcast(fs_type, root=0) return fs_type.replace("\n", "")
[docs] @classmethod def is_shared_fs(cls, path): return cls.get_fs_type(path) in ["nfs"]
[docs] @classmethod def default_path(cls): """Get the current default path used for io. Returns ------- string the default value of the current i/o path. """ assert cls._default_path is not None, "default path has not been set." return cls._default_path
[docs] @classmethod def default_ram_path(cls): """Get the current default path used for io in memory. Returns ------- string the default value of the current RAM i/o path. """ try: import memory_tempfile except ImportError as e: print print(e) print msg = "You are trying to use a RAM filesystem but the 'mempory_tempfile' is not present on your system." msg += "Get it from https://gitlab.com/keckj/memory-tempfile." raise RuntimeError(msg) mt = memory_tempfile.MemoryTempfile(fallback=True) if mt.found_mem_tempdir(): return mt.gettempdir() else: return None
[docs] @staticmethod def check_dir(filepath, io_rank=0, comm=None): """Check if the directory of 'filename' exists and creates it if not. Parameters ----------- filepath : string directory path with full or relative path io_rank : int processus rank that does the check. comm : mpi communicator the mpi communicator that does the check. """ # Create output dir if required if comm is None: comm = mpi.main_comm if comm.Get_rank() == io_rank: d = filepath if not os.path.exists(d): os.makedirs(d)
[docs] @staticmethod def set_default_path(pathdir): """Set a new default path for hysop i/o. Parameters ----------- pathdir : string the new path """ assert isinstance(pathdir, str) IO._default_path = pathdir IO.check_dir(IO._default_path)
[docs] @classmethod def default_cache_path(cls): from hysop import get_env home = os.path.expanduser("~") tmp = tempfile.gettempdir() candidates = [get_env("CACHE_DIR", None), f"{home}/.cache", home, f"{tmp}"] cpath = None for c in candidates: if c is None: continue elif c == home: cpath = f"{home}/.hysop" else: cpath = f"{c}/hysop" cpath += f"/{socket.gethostname()}" break if cpath is None: msg = "No suitable caching directory was found in {}." msg = msg.format(candidates) raise RuntimeError(msg) cpath = f"{cpath}/python{sys.version_info.major}_{sys.version_info.minor}" if not os.path.exists(cpath): try: if mpi.main_rank == 0: os.makedirs(cpath) except OSError: pass return cpath
[docs] @staticmethod def cache_path(): if IO._cache_path is None: IO.set_cache_path(IO.default_cache_path()) return IO._cache_path
[docs] @classmethod def ram_path(cls): return cls.default_ram_path()
[docs] @classmethod def get_tmp_dir(cls, key): """ Create or get an existing temporary directory. """ if key in cls._tmp_dirs: tmp_dir = cls._tmp_dirs[key] else: tmp_dir = tempfile.mkdtemp() cls._tmp_dirs[key] = tmp_dir return tmp_dir
@classmethod def _remove_tmp_dirs(cls): for f in cls._tmp_dirs.values(): shutil.rmtree(f, ignore_errors=True, onerror=None)
[docs] @classmethod def set_cache_path(cls, path): if cls.is_shared_fs(path): hostname = socket.gethostname() if hostname not in path: new_path = f"{path}/{hostname}" msg = "\nSpecified cache path '{}' is stored on a network filesystem " msg += "which does not correctly support file locking." msg += "\nSetting cache_path to '{}'." msg = msg.format(path, new_path) warnings.warn(msg, HysopWarning) path = new_path IO._cache_path = path IO.check_dir(path)
[docs] @staticmethod def set_datasetname(field_name, topo, direction=None): """Return the dataset name of a given continuous field, saved for a given topology """ val = field_name + "_" + str(topo.get_id()) if direction is not None: val += DirectionLabels[direction] return val
[docs] @staticmethod def get_datasetnames(filename): """Return the list of dataset names present in hdf input file Parameters ---------- filename : string hdf file Returns ------- a list of strings """ import h5py hdf_file = h5py.File(filename, "r") keys = hdf_file.keys() hdf_file.close() return keys
[docs] class IOParams( namedtuple( "IOParams", [ "filename", "filepath", "frequency", "fileformat", "dump_times_fp32", "dump_times_fp64", "dump_tstart", "dump_tend", "dump_func", "io_leader", "visu_leader", "with_last", "enable_ram_fs", "force_ram_fs", "dump_is_temporary", "postprocess_dump", "append", "hdf5_disable_compression", "hdf5_disable_slicing", "disk_filepath", "kwds", ], ) ): """ A struct to handle I/O files parameters Parameters ----------- filename : string Name of the file (absolute or relative path) filepath : string Location of the file frequency : int Frequency of output or input (e.g. every N times steps) fileformat : int Format of the file. See notes for available format. Default=HDF5. dump_times: tuple of floats Extra dump times that should be used to dump in addition to frequency (double precision) dump_tstart: float Start to dump at given time. Defaults to -np.inf (no time constraints). dump_tend: float Stop to dump at given time. Defaults to +np.inf (no time constraints). dump_func: function Generic function to compute the should_dump result. with_last: boolean should dump when iteration is last one io_leader : int Rank of the mpi process dealing with the io. Default is 0. visu_leader : int Rank of the mpi process dealing with the graphical io. Default is 0. enable_ram_fs: bool Instruct the dumper to write directly to RAM, fallback to filepath/filename when this is not possible. force_ram_fs: bool Force the dumper to write directly to RAM, and raise an error when this is not possible (filepath/filename are ignored). Implies enable_ram_fs. dump_is_temporary: bool Instruct the dumper to delete dumped data from disk or RAM after postprocessing script has been called. Implies that a postprocessing script is supplied. postprocess_dump: str Path to a postprocessing script that will be called after dump. See hysop/tools/postprocess_dump.sh for an example of post processing script. hdf5_disable_compression: bool Disable compression for HDF5 outputs (when available). Can be used to accelerate in RAM postprocessing. hdf5_disable_slicing: bool Disable slicing for HDF5 outputs (when available). May reduce performance but avoid hdf5 file fragmentation. append : bool, optional Tell if appended (on xmf files, when using hdf format) kwds: dict Custom extra keyword arguments to pass to operators See examples in hysop.operator.hdf_io Notes ----- Format parameter must be one of the following : - :class:`~IO.HDF5` - :class:`~IO.ASCII` """ def __new__( cls, filename, filepath=None, frequency=1, fileformat=None, dump_times=None, dump_tstart=None, dump_tend=None, dump_func=None, io_leader=0, visu_leader=0, with_last=False, enable_ram_fs=False, force_ram_fs=False, dump_is_temporary=False, postprocess_dump=None, hdf5_disable_compression=False, hdf5_disable_slicing=False, append=False, **kwds, ): dump_tstart = first_not_None(dump_tstart, -np.inf) dump_tend = first_not_None(dump_tend, +np.inf) fileformat = first_not_None(fileformat, IO.HDF5) dump_times = first_not_None(dump_times, ()) check_instance(filename, str, allow_none=True) check_instance(filepath, str, allow_none=True) check_instance(frequency, int) check_instance(dump_times, tuple, values=(float, np.float64)) check_instance(dump_tstart, (int, float, np.float64)) check_instance(dump_tend, (int, float, np.float64)) check_instance(io_leader, int) check_instance(visu_leader, int) check_instance(with_last, bool) check_instance(enable_ram_fs, bool) check_instance(force_ram_fs, bool) check_instance(dump_is_temporary, bool) check_instance(postprocess_dump, str, allow_none=True) check_instance(hdf5_disable_compression, bool) check_instance(hdf5_disable_slicing, bool) check_instance(append, bool) if dump_func: assert callable(dump_func), "given function must be callable" assert ( dump_func.__code__.co_argcount ), "given function must take one arg (as simulation object)" frequency = int(frequency) dump_tstart = float(dump_tstart) dump_tend = float(dump_tend) io_leader = int(io_leader) visu_leader = int(visu_leader) dump_times_fp64 = tuple(map(np.float64, dump_times)) dump_times_fp32 = tuple(map(np.float32, dump_times)) if force_ram_fs: enable_ram_fs = True try: ram_path = IO.ram_path() except RuntimeError: if force_ram_fs: raise else: ram_path = None disk_filepath = None if enable_ram_fs and (ram_path is not None): if filename: assert not os.path.isabs(filename), filename disk_filepath = filepath filepath = ram_path # Filename is absolute path, filepath arg is ignored. if filename: if os.path.isabs(filename): filepath = os.path.dirname(filename) else: if filepath is not None: filename = os.path.join(filepath, filename) filepath = os.path.abspath(os.path.dirname(filename)) else: filepath = os.path.dirname(filename) if filepath == "": # Get default output path filepath = IO.default_path() filename = os.path.join(filepath, filename) else: filepath = os.path.abspath(filepath) filename = os.path.join(filepath, os.path.basename(filename)) elif filepath: filepath = os.path.abspath(filepath) else: filepath = IO.default_path() IO.check_dir(filepath) if disk_filepath is None: disk_filepath = filepath if dump_is_temporary: msg = "Dump is temporary but no postprocessing script has been supplied" assert postprocess_dump is not None, msg return super().__new__( cls, filename, filepath, frequency, fileformat, dump_times_fp32, dump_times_fp64, dump_tstart, dump_tend, dump_func, io_leader, visu_leader, with_last, enable_ram_fs, force_ram_fs, dump_is_temporary, postprocess_dump, append, hdf5_disable_compression, hdf5_disable_slicing, disk_filepath, kwds, )
[docs] def should_dump(self, simulation): if self.dump_func is not None: return self.dump_func(simulation) frequency = self.frequency t = simulation.t() dump = (frequency >= 0) and (self.with_last and simulation._next_is_last) if t > self.dump_tend + simulation.tol: return dump if (frequency >= 0) and simulation.is_time_of_interest: if isinstance(t, np.float32): dump |= t in self.dump_times_fp32 elif isinstance(t, np.float64): dump |= t in self.dump_times_fp64 else: raise NotImplementedError(type(t)) if frequency > 0: dump |= (simulation.current_iteration % frequency) == 0 return dump
[docs] def clone(self, **kwds): keys = ( "filename", "frequency", "fileformat", "dump_times", "dump_tstart", "dump_tend", "dump_func", "io_leader", "visu_leader", "with_last", "enable_ram_fs", "force_ram_fs", "dump_is_temporary", "postprocess_dump", "hdf5_disable_compression", "hdf5_disable_slicing", "append", "kwds", ) diff = set(kwds.keys()).difference(keys) if diff: msg = f"Unknown parameters {diff} for class {self.__class__.__name__}." raise ValueError(msg) all_kwds = {} for k in keys: if k == "kwds": for k, v in kwds.get(k, getattr(self, k)).items(): all_kwds[k] = v else: all_kwds[k] = kwds.get(k, getattr(self, k)) all_kwds["filepath"] = kwds.get("filepath", getattr(self, "disk_filepath")) return IOParams(**all_kwds)
@property def dump_times(self): return self.dump_times_fp64 def __str__(self): return self.to_string()
[docs] def to_string(self, prefix=""): ss = """filename: {} filepath: {} fileformat: {} frequency: {} dump_times: {} dump_tstart: {} dump_tend: {} dump_func: {} io_leader: {} visu_leader: {} enable_ram_fs: {} force_ram_fs: {} dump_is_tmp: {} post_process: {} hdf5_no_compr: {} hdf5_no_slice: {} append: {} extra_kwds: {}""".format( self.filename, self.filepath, self.fileformat, self.frequency, self.dump_times, self.dump_tstart, self.dump_tend, self.dump_func, self.io_leader, self.visu_leader, self.enable_ram_fs, self.force_ram_fs, self.dump_is_temporary, self.postprocess_dump, self.hdf5_disable_compression, self.hdf5_disable_slicing, self.append, self.kwds, ) return prefix + ("\n" + prefix).join(ss.split("\n"))
[docs] class Writer: """ To write data from a 2D numpy array into an ascii file. Examples -------- >>> from hysop.tools.io_utils import IOParams, IO, Writer >>> params = IOParams(filename='r.dat', fileformat=IO.ASCII) >>> wr = Writer(params, buffshape=(1, 2)) >>> ite = 3 # current iteration number >>> if wr.do_write(ite): ... wr.buffer[...] = 3. ... wr.write() >>> wr.finalize() result : buffer is written into r.dat """ def __init__(self, io_params, buffshape=None, mpi_params=None, safe_io=True): """ Parameters ---------- io_params : hysop.tools.io_utils.IOParams Setup for file ouput (name, location ...) buffshape : tuple 2D numpy.array.shape like tuple, shape of the output/input buffer. mpi_params : hysop.tools.parameters.MPIParams Mpi setup (comm that owns the writer) safe_io : boolean True --> open/close file everytime data are written. False --> open at init and close during finalize. Cost less but if simu crashes, data are lost. """ # Absolute path + name for i/o file # Note that if filename contains absolute path # filepath is ignored msg = "wrong type for io_params arg." assert isinstance(io_params, IOParams), msg assert io_params.fileformat == IO.ASCII self.io_params = io_params # A reference communicator, just to identify a # process rank for io. if mpi_params is None: mpi_params = MPIParams() else: msg = "wrong type for mpi_params arg." assert isinstance(mpi_params, MPIParams), msg self.mpi_params = mpi_params # check if output dir exists, create it if not. IO.check_dir( self.io_params.filepath, self.io_params.io_leader, self.mpi_params.comm ) # Shape of the output buffer (must be a 2D numpy array) if buffshape is None: buffshape = (1, 1) self._buffshape = buffshape assert ( len(self._buffshape) == 2 ), "2D shape required : set arg buffshape as a 2D tuple: (x,y)" # The buffer (numpy array) that will be printed to a file from hysop.tools.numpywrappers import npw self.buffer = npw.zeros(self._buffshape) " buffer used to save printed data" # Defines how often # output file is written : # True --> open/close file everytime # data are written. # False --> open at init and close # during finalize. Cost less but if simu # crashes, data are lost. if safe_io: self.write = self._fullwrite else: self.write = self._partialwrite # Force synchro to be sure that all output dirs # have been created. self.mpi_params.comm.barrier() if self.mpi_params.rank == self.io_params.io_leader: self._file = open(self.io_params.filename, "w")
[docs] def do_write(self, ite): """Returns true if output is required for iteration ite Parameters ---------- ite : int current iteration number """ num = ite + 1 # Number of iterations done rk = self.mpi_params.rank return rk == self.io_params.io_leader and (num % self.io_params.frequency) == 0
def _ft_write(self): """Write a two-dim. NumPy array a in tabular form to fileobj.""" # Function taken from scitools # fastest version (of the write family of functions) so far... # written by Mario Pernici <Mario.Pernici@mi.infn.it> fileobj, a = self._file, self.buffer if len(a.shape) != 2: raise TypeError("a 2D array is required, shape now is " + str(a.shape)) N = 512 shape0 = a.shape[0] shape1 = a.shape[1] str_fmt = "%g\t" * (shape1 - 1) + "%g\n" # use a big format string str_fmt_N = str_fmt * N for i in range(shape0 // N): a1 = a[i : i + N, :] # put a1 in 1D array form; ravel better than reshape for # non-contiguous arrays. a1 = np.ravel(a1) fileobj.write(str_fmt_N % tuple(a1)) for i in range(shape0 - shape0 % N, shape0): fileobj.write(str_fmt % tuple(a[i])) def _fullwrite(self): """open, write and close""" # import scitools.filetable as ft self._file = open(self.io_params.filename, "a") # ft.write(self._file, self.buffer) self._ft_write() self._file.close() def _partialwrite(self): """just write, no open, nor close""" # import scitools.filetable as ft # ft.write(self._file, self.buffer) self._ft_write()
[docs] def finalize(self): """close, if required""" if self.mpi_params.rank == self.io_params.io_leader: if not self._file.closed: self._file.close()
def __str__(self): if self.mpi_params.rank == self.io_params.io_leader: s = " === Writer === \n" s += " - filename = " + self.io_params.filename s += "\n - buffshape = " + str(self._buffshape) s += "\n - frequ = " + str(self.io_params.frequency) return s
[docs] class XMF: """Static class - Tools to prepare and write xmf file""" @staticmethod def _list_format(l): """Format a list to the xml output. Removes the '[]()' and replace ',' with ' ' in default str. Parameters ---------- l : list to format """ buff = str(l).replace(",", " ").replace("[", "") return buff.replace("]", "").replace("(", "").replace(")", "")
[docs] @staticmethod def prepare_grid_attributes( dataset_names, resolution, origin, step, joinrkfiles=None ): """ Prepare XDMF header as a string. Parameters ----------- dataset_names : list all datasets names resolution: 3d tuple origin: 3d tuple step: 3d tuple joinrkfiles : (optional) Returns: -------- string the xml-like header formattable with the following keywords: niteration : iteration number time: time in seconds filename: target file name, in sequential or with parallel hdf5 support filename0, ... filenameN : target file names for each rank 0 to N, in parallel without HDF5 parallel support resolution0, ... resolutionN : local resolutions for each rank 0 to N, in parallel without HDF5 parallel support """ # The header (xml-like), saved in a string. # always use a 3D mesh because paraview does not like 2D meshes (uses axe (Y,Z) instead of (X,Y)). xml_grid = "" topo_type = "3DCORECTMesh" geo_type = "ORIGIN_DXDYDZ" xml_grid += ' <Grid Name="Iteration {}"'.format("{niteration:06d}") xml_grid += ' GridType="Uniform">\n' xml_grid += ' <Time Value="{}" />\n'.format("{time}") xml_grid += ' <Topology TopologyType="' + str(topo_type) + '"' xml_grid += ' NumberOfElements="' xml_grid += XMF._list_format(resolution) + ' "/>\n' xml_grid += ' <Geometry GeometryType="' + geo_type + '">\n' xml_grid += ' <DataItem Dimensions="' + str(3) + ' "' xml_grid += ' NumberType="Float" Precision="8" Format="XML">\n' xml_grid += " " + XMF._list_format(origin) + "\n" xml_grid += " </DataItem>\n" xml_grid += ' <DataItem Dimensions="' + str(3) + ' "' xml_grid += ' NumberType="Float" Precision="8" Format="XML">\n' xml_grid += " " + XMF._list_format(step) + "\n" xml_grid += " </DataItem>\n" xml_grid += " </Geometry>\n" # Append dataset parameters for name in dataset_names: xml_grid += ' <Attribute Name="' xml_grid += name + '"' xml_grid += ' AttributeType="Scalar" Center="Node">\n' if joinrkfiles is None: xml_grid += ' <DataItem Dimensions="' xml_grid += XMF._list_format(resolution) + ' "' xml_grid += ' NumberType="Float" Precision="8" Format="HDF"' xml_grid += ' Compression="Raw">\n' # xml_grid += " {filename}" xml_grid += ":/" + name xml_grid += "\n </DataItem>\n" else: xml_grid += ' <DataItem Dimensions="' xml_grid += XMF._list_format(resolution) + ' "' xml_grid += ' ItemType="Function" Function="JOIN(' xml_grid += " ; ".join("$" + str(i) for i in joinrkfiles) xml_grid += ')">\n' for i in joinrkfiles: xml_grid += ' <DataItem Dimensions="' xml_grid += "{resolution" + str(i) + "}" + ' "' xml_grid += ' NumberType="Float" Precision="8" Format="HDF"' xml_grid += ' Compression="Raw">\n' # xml_grid += " {filename" + str(i) + "}" xml_grid += ":/" + name xml_grid += "\n </DataItem>\n" xml_grid += " </DataItem>\n" xml_grid += " </Attribute>\n" xml_grid += " </Grid>\n" return xml_grid
atexit.register(IO._remove_tmp_dirs)